iT邦幫忙

2023 iThome 鐵人賽

DAY 30
0
Software Development

Rust Web API 從零開始系列 第 30

Day30 - 附錄: Rust中的非同步程式設計(4)

  • 分享至 

  • xImage
  •  

昨天我們看到rust的async/await會在編譯時期把非同步方法實作成Future物件,並且把所有的Future物件組織成樹狀資料結構,使得runtime可以針對排列好的Future物件順序執行。但這樣就有了另一個問題,通常我們在寫非同步程式時並不希望一項任務結束才進行下一項任務,而是希望同時能夠等待多個任務的結果。因為狀態樹是基於Future物件的組合,而上層的Future並不知曉下層的Future實現,因此我們可以把並行的任務隱藏在一個組合用的Future物件背後。

Join!

rust中的幾個函式庫都有提供join!巨集,目的是用來組合多個Future,並且等到全部的Future都完成後回傳結果,而它背後的原理就是產生一個包裹用的物件,假設我們有一個結構是用來執行兩個任務,用來等待兩個同時開始的非同步任務都完成:

struct JoinTwo {
    a: dyn Future<Output = ()>,
    b: dyn Future<Output = ()>,
}

上面的JoinTwo裡面組合了兩個實做Future特徵的物件,關聯型別是()表示這兩個任務都沒有回傳值,這時候會編譯不過,因為結構體的成員需要是sized的,所以需要改一下上面的程式:

struct JoinTwo {
    a: Box<dyn Future<Output = ()>>,
    b: Box<dyn Future<Output = ()>>,
}

目前編譯過了,那麼接下來要幫JoinTwo實做Future:

impl Future for JoinTwo {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    
        /// 同時呼叫a跟b的poll並檢查是不是都ready了
        if self.a.poll(cx).is_ready()
            && self.b.poll(cx).is_ready() {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

這時候編譯器就報錯了:

error[E0599]: no method named `poll` found for struct `Box<(dyn Future<Output = ()> + 'static)>` in the current scope
  --> src/main.rs:46:19
   |
46 |         if self.a.poll(cx) == Poll::Ready(())
   |                   ^^^^ method not found in `Box<dyn Future<Output = ()>>`

這是因為self.aself.b的型別並沒有滿足poll的需求Pin<&mut Self>,我們先快速的把程式碼改一下:

impl Future for JoinTwo {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let a = Pin::new(&mut self.a);
        let b = Pin::new(&mut self.b);

        if a.poll(cx).is_ready()
            && b.poll(cx).is_ready() {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

這次還是無法編譯過,編譯器告訴我們這是因為ab並沒有滿足Unpin這個特徵。

error[E0599]: the method `poll` exists for struct `Pin<&mut Box<dyn Future<Output = ()>>>`, but its trait bounds were not satisfied
   --> src/main.rs:49:14
    |
49  |           if a.poll(cx) == Poll::Ready(())
    |                ^^^^ method cannot be called on `Pin<&mut Box<dyn Future<Output = ()>>>` due to unsatisfied trait bounds
    |
   ::: /home/marvinhsu/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:37:1
    |
37  |   pub trait Future {
    |   ---------------- doesn't satisfy `(dyn Future<Output = ()> + 'static): Unpin`
    |
   ::: /home/marvinhsu/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:195:1
    |
195 | / pub struct Box<
196 | |     T: ?Sized,
197 | |     #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
198 | | >(Unique<T>, A);
    | |_- doesn't satisfy `Box<(dyn Future<Output = ()> + 'static)>: Future`
    |
    = note: the following trait bounds were not satisfied:
            `(dyn Future<Output = ()> + 'static): Unpin`
            which is required by `Box<(dyn Future<Output = ()> + 'static)>: Future`

await與狀態切割

先拉回到昨天的程式碼,我們建立了這樣的結構:

pub struct MainFuture
{
    state: MainState
}

enum MainState{
  MainStart,
  Task1Running(Task1Future),
  Task2Running(Task2Future),
  MainComplete
}

但通常情況下得程式碼不可能這麼簡單,每遇到一段await,編譯器就要把方法的內容切開來,那麼如果出現下面這段程式碼:

let mut a = 1;
/// -----斷開鎖鏈-----
task1.await;
/// -----斷開鎖鏈-----
a = a + 1;
/// -----斷開鎖鏈-----
task2.await;
/// -----斷開鎖鏈-----
a = a + 1;

await切割的每個區段都是獨立彼此無法互相知曉的,因此需要把每個階段的變數都放到狀態機中,然而除非能夠確保變數a的位置是固定不變的,不然很容易改到錯的東西,以上面的例子就會是第一次的a+1暫存在狀態中,因此位置被移動了,第二次的a+1如果不知道而去抓最開始的1,然而這段記憶體已經被回收掉了。

Unpin與!Unpin

那麼要如何保證a的位置不改變呢?只要讓改成以指標指向這個值,以C#的說法就是使用參考,透過參考去取得實例就可以避免實例被移動。這裡要帶出第一個觀念Unpin,這是一個標記用的特徵,暗示了這個可以任意移動,大多數的型別像是i8或是指標型別Box都是Unpin的,所以我們可以隨意的移動這些物件在記憶體中的位置。與之相對就是!Unpin,指的是不能夠任意移動的像是實做Future的物件。

依照目前已知的東西,我們回來看前面遇到的第二個錯誤doesn't satisfy (dyn Future<Output = ()> + 'static): Unpin,這是因為雖然我們取到的self.a是被Box包裝的,但內層包裹了一個Future,也就是說他是!Unpin,那解決方案就是再包裝一層Pin:

struct JoinTwo {
    a: Pin<Box<dyn Future<Output = ()>>>,
    b: Pin<Box<dyn Future<Output = ()>>>,
}

然而編譯器還是告訴我們寫錯了:

error[E0596]: cannot borrow `self` as mutable, as it is not declared as mutable
  --> src/main.rs:45:13
   |
45 |     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
   |             ^^^^ not mutable
46 |         let a = Pin::new(&mut self.a);
   |                               ---- cannot borrow as mutable
47 |         let b = Pin::new(&mut self.b);
   |                               ---- cannot borrow as mutable
   |
help: consider changing this to be mutable
   |
45 |     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
   |      

Pin

前面提到可以使用指標來防止實例被移動,然而這並不是萬全的解方,因為&mut可以透過mem::swap等方法修改指向的位置。有一句話叫做沒有什麼是不能靠再墊一層解決的Pin就是這個再墊一層,思路就是靠這一層來限制對Box的操作,透過對型別的約束來強迫實作方需要好好遵守規則。那麼針對前面not mutable的錯誤,Pin則提供了get_mut()的方法來解決,所以我們得到了完成板的JoinTwo實作:

impl Future for JoinTwo {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        let a = Pin::new(&mut this.a);
        let b = Pin::new(&mut this.b);

        if a.poll(cx).is_ready()
            && b.poll(cx).is_ready() {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

這次就通過編譯了,我們可以來試試看自己組好的JoinTwo:

#[tokio::main]
async fn main() {
    let task = JoinTwo {
        a: Box::pin(async { println!("This is task a."); }),
        b: Box::pin(async { println!("This is task b."); }),
    };

    task.await;
}

/// This is task a.
/// This is task b.

小結

透過實作Future,其實我們就是在poll裡面將非同步方法當成同步的方式在操作,因為rust非同步程式中資料結構的設計,我們可以將需要並行或是其他執行順序的操作隱藏在自己實做的Future背後,但這也是為什麼如果不是順序進行的非同步方法,rust中需要透過特殊的標記像是join!或是select!來做到。
PS:rust編譯器真的是好老師

參考資料:Rust Async: Pin概念解析


上一篇
Day29 - 附錄: Rust中的非同步程式設計(3)
系列文
Rust Web API 從零開始30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
chihying
iT邦新手 4 級 ‧ 2023-09-30 09:57:48

恭喜完賽啦/images/emoticon/emoticon12.gif

Hell Kiki iT邦新手 4 級 ‧ 2023-09-30 11:16:29 檢舉

恭喜完成

我要留言

立即登入留言